package com.atguigu.flink.chapter02_DataStreamAPI.source;

import com.alibaba.fastjson.JSON;
import com.atguigu.flink.pojo.MarketingUserBehavior;
import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

/**
 * Created by Smexy on 2022/10/24
 */
public class MySource  implements SourceFunction<String>
{


        //读取数据，发送到下游
        @Override
        public void run(SourceContext ctx) throws Exception {

            String [] bs = { "install","uninstall","download","update"};
            String [] cs = { "ZTE","VIVO","OPPO","MI","HUAWEI"};


            while(true){

                MarketingUserBehavior marketingUserBehavior = new MarketingUserBehavior(
                    RandomUtils.nextLong(1l, 10000l),
                    bs[RandomUtils.nextInt(0, bs.length)],
                    cs[RandomUtils.nextInt(0, cs.length)],
                    System.currentTimeMillis()

                );

                Thread.sleep(500);
                //把数据收集起来，发送到下游
                ctx.collect( JSON.toJSONString(marketingUserBehavior));
            }


        }

        //关闭数据源的时候调用，一般是不关，流式，源源不断
        @Override
        public void cancel() {

        }

}
